跳到主要内容

Reactive Stream 响应流

转载自:JDK9特性-Reactive Stream 响应式流

Reactive Stream 概念

Reactive Stream(响应式流/反应流)是 JDK9 引入的一套标准,是一套基于 发布/订阅模式 的数据处理规范。 它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。

更确切地说,Reactive 流目的是 “找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞 背压 方式实现数据的异步流”。

背压(back pressure)概念

如果生产者发出的信息比消费者能够处理消息最大量还要多,消费者可能会被迫一直在抓消息,耗费越来越多的资源,埋下潜在的崩溃风险。为了防止这一点,需要有一种机制 使消费者可以通知生产者,降低消息的生成速度。生产者可以采用多种策略来实现这一要求,这种机制称为背压。

简单来说就是

  • 背压指的发布者和订阅者之间的互动
  • 订阅者可以告诉发布者自己需要多少数据,可以调节数据流量,不会导致发布者发布数据过多导致数据浪费或压垮订阅者

JDK9中 Reactive Stream规范的实现

JDK9 中 Reactive Stream 的实现规范 通常被称为 Flow API ,通过 java.util.concurrent.Flow 和 java.util.concurrent.SubmissionPublisher 类来实现响应式流

在 JDK9 里 Reactive Stream 的主要接口声明在 Flow 类里,Flow 类中定义了四个嵌套的静态接口,用于建立流量控制的组件,发布者在其中生成一个或多个供订阅者使用的数据项:

  • Publisher:数据项发布者、生产者
  • Subscriber:数据项订阅者、消费者
  • Subscription:发布者与订阅者之间的关系纽带,订阅令牌
  • Processor:数据处理器

发布者 Publisher

Publisher 将数据流发布给注册的 Subscriber。 它通常使用 Executor 异步发布项目给订阅者。 Publisher 需要确保每个订阅的 Subscriber 方法严格按顺序调用。

subscribe:订阅者订阅发布者

@FunctionalInterface 
public static interface Flow.Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}

订阅者 Subscriber

Subscriber 订阅 Publisher 的数据流,并接受回调。 如果 Subscriber 没有发出请求,就不会收到数据。对于给定 订阅合同(Subscription),调用 Subscriber 的方法是严格按顺序的。

  • onSubscribe:发布者调用订阅者的这个方法来异步传递订阅 , 这个方法在 publisher.subscribe 方法调用后被执行
  • onNext:发布者调用这个方法传递数据给订阅者
  • onError:当 Publisher 或 Subscriber 遇到不可恢复的错误时调用此方法,之后不会再调用其他方法
  • onComplete:当数据已经发送完成,且没有错误导致订阅终止时,调用此方法,之后不再调用其他方法

订阅合同 Subscription

Subscription 用于连接 Publisher 和 Subscriber。Subscriber 只有在请求时才会收到项目,并可以通过 Subscription 取消订阅。Subscription 主要有两个方法:

  • request:订阅者调用此方法请求数据
  • cancel:订阅者调用这个方法来取消订阅,解除订阅者与发布者之间的关系
public static interface Flow.Subscription {
public void request(long n);
public void cancel();
}

处理器 Processor

Processor 位于 Publisher 和 Subscriber 之间,用于做数据转换。可以有多个 Processor 同时使用,组成一个处理链,链中最后一个处理器的处理结果发送给 Subscriber。JDK 没有提供任何具体的处理器。处理器同时是订阅者和发布者,接口的定义也是继承了两者 即作为订阅者也作为发布者 ,作为订阅者接收数据,然后进行处理,处理完后作为发布者,再发布出去。

(Flow API )规范调用流程

Publisher 是能够发出元素的发布者,Subscriber 是接收元素并做出响应的订阅者。当执行 Publisher 里的 subscribe 方法时,发布者会回调订阅者的 onSubscribe 方法,这个方法中,通常订阅者会借助传入的 Subscription 向发布者请求 n 个数据。然后发布者通过不断调用订阅者的 onNext 方法向订阅者发出最多 n 个数据。如果数据全部发完,则会调用 onComplete 告知订阅者流已经发完;如果有错误发生,则通过 onError 发出错误数据,同样也会终止流。

其中,Subscription 相当于是连接 Publisher 和 Subscriber 的 “纽带”。因为当发布者调用 subscribe 方法注册订阅者时,会通过订阅者的回调方法 onSubscribe 传入 Subscription 对象,之后订阅者就可以使用这个 Subscription 对象的 request 方法向发布者 “要” 数据了。背压机制正是基于此来实现的。

面简单描述一下上面提到的各个方法的执行顺序:

  • 创建发布者和订阅者,分别是 Publisher 和 Subscriber 的实例;
  • 订阅者调用发布者的 subscribe 进行订阅;
  • 发布者调用订阅者的 onSubscribe 传递订阅 Subscription;
  • 订阅者调用 Subscription 的 request 方法请求数据;
  • 发布者调用订阅者的 onNext 方法传递数据给订阅者;
  • 数据传递完成后发布者调用订阅者的 onComplete 方法通知完成;

响应流使用示例

public class FlowDemo {

public static void main(String[] args) throws Exception {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();

// 2. 定义订阅者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {

private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);

// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者已经达到了目标, 可以调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();

// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}

};

// 3. 发布者和订阅者 建立订阅关系
publiser.subscribe(subscriber);

// 4. 生产数据, 并发布
// 这里忽略数据生产过程
for (int i = 0; i < 3; i++) {
System.out.println("生成数据:" + i);
// submit是个block方法
publiser.submit(i);
}

// 5. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();

// 主线程延迟停止, 否则数据没有消费就会退出
Thread.currentThread().join(1000);
// debug的时候, 下面这行需要有断点
// 否则主线程结束无法debug
System.out.println();
}
}

运行结果如下:

上文中提到过可以调节发布者的数据产出速度,那么这个速度是如何调节的呢?关键就在于 submit 方法,该方法是一个阻塞方法。需要先说明的是 SubmissionPublisher 里有一个数据缓冲区,用于缓冲发布者产生的数据,而这个缓冲区是利用一个 Object 数组实现的,缓冲区最大长度为 256。我们可以在 onSubscribe 方法里打上断点,查看到这个缓冲区:

当这个缓冲区的数据满了之后,submit 方法就会进入阻塞状态,发布者数据的产生速度就会变慢,以此实现调节发布者的数据产出速度。

结合 Processor 的使用方式

/**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher<String>
implements Processor<Integer, String> {

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("处理器接受到数据: " + item);

// 过滤掉小于0的, 然后发布出去
if (item > 0) {
this.submit("转换后的数据:" + item);
}

// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();

// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关闭发布者
this.close();
}

}

/**
* 带 process 的 flow demo
* @author 01
*/
public class FlowDemo2 {

public static void main(String[] args) throws Exception {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();

// 2. 定义处理器, 对数据进行过滤, 并转换为String类型
MyProcessor processor = new MyProcessor();

// 3. 发布者 和 处理器 建立订阅关系
publiser.subscribe(processor);

// 4. 定义最终订阅者, 消费 String 类型数据
Subscriber<String> subscriber = new Subscriber<>() {

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(String item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);

// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();

// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}

};

// 5. 处理器 和 最终订阅者 建立订阅关系
processor.subscribe(subscriber);

// 6. 生产数据, 并发布
// 这里忽略数据生产过程
publiser.submit(-111);
publiser.submit(111);

// 7. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();

// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
}
}

运行结果如下: